0%

Spark Dataframe基本操作

Spark Dataframe基本操作

创建DataFrame

1
2
val df=spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")
1
2
3
4
5
6
7
8
9
val myManualSchema = new StructType(
Array(new StructField("some",StringType,true),
new StructField("col",StringType,true),
new StructField("names",LongType,false)
))

val myRows=Seq(Row("Hello",null,1L))
val myRDD=spark.sparkContext.parallelize(myRows)
val myDf=spark.createDataFrame(myRDD,myManualSchema)
1
val myDF=Seq("Hello",2,1L).toDF("col1","col2","col3")

Select

1
df.select("col1","col2").show(2)
1
2
3
4
5
6
7
8
df.select(
df.col("column1"),
col("column1"),
column("column1"),
'column1',
$"column1",
expr("column1")
).show(2)
1
2
3
4
df.selectExpr(
"*",
"DEST_COUNTRT_NAME=ORIGIN_COUNTRY_NAME" as WithinCountry)
.show(2)

字面量

1
df.select(expr("*"),lit(1).as("One")).show(2)

添加列

1
df.withColumn("numberOne",lit(1)).show(2)
1
df.withColumn("withinCountry",expr("col1==col2")).show(2)

重命名列

1
df.withColumnRenamed("old","new").columns

删除列

1
2
3
df.drop("col1").columns

df.withColumn("count2",col("count").cast("long"))

过滤行

1
2
3
4
df.filter(col("count")<2).show(2)
df.where("count < 2").show(2)

df.where("").where("")

行排序

1
2
df.orderBy("count","DEST_COUNTRY_NAME").show()
df.orderBy(desc("count"),asc("col2")).show(2)